{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**_pySpark Basics: Merging and Joining Data_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_by Jeff Levy (jlevy@urban.org)_\n",
    "\n",
    "_Last Updated: 31 Jul 2017, Spark v2.1_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: This guide will go over the various ways to concatenate two or more dataframes_\n",
    "\n",
    "_Main operations used: unionAll, join_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We begin with some basic setup to import the SQL structure that supports the dataframes we'll be using:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Stacking Rows with Matching Columns\n",
    "\n",
    "You may have the same columns in each dataframe and just want to stack one on top of the other, row-wise.  We can make this happen with a helper function, after we first build three simple toy dataframes:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "\n",
    "row = Row(\"name\", \"pet\", \"count\")\n",
    "\n",
    "df1 = sc.parallelize([\n",
    "    row(\"Sue\", \"cat\", 16),\n",
    "    row(\"Kim\", \"dog\", 1),    \n",
    "    row(\"Bob\", \"fish\", 5)\n",
    "    ]).toDF()\n",
    "\n",
    "df2 = sc.parallelize([\n",
    "    row(\"Fred\", \"cat\", 2),\n",
    "    row(\"Kate\", \"ant\", 179),    \n",
    "    row(\"Marc\", \"lizard\", 5)\n",
    "    ]).toDF()\n",
    "\n",
    "df3 = sc.parallelize([\n",
    "    row(\"Sarah\", \"shark\", 3),\n",
    "    row(\"Jason\", \"kids\", 2),    \n",
    "    row(\"Scott\", \"squirrel\", 1)\n",
    "    ]).toDF()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If we just want to stack two of them, we can use `unionAll`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_union = df1.unionAll(df2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+------+-----+\n",
      "|name|   pet|count|\n",
      "+----+------+-----+\n",
      "| Sue|   cat|   16|\n",
      "| Kim|   dog|    1|\n",
      "| Bob|  fish|    5|\n",
      "|Fred|   cat|    2|\n",
      "|Kate|   ant|  179|\n",
      "|Marc|lizard|    5|\n",
      "+----+------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_union.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `unionAll` method only allows us to stack two dataframes at a time.  We could do that repeatedly if there were more than one to stack in this way, but we can also use a helper function to make it easier.  \n",
    "\n",
    "The standard Python command `reduce` applies a function to a list of items in order to \"reduce\" it down to one output.  With this you can pass as many dataframes as you like into our helper function and they will come out stacked in one:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import DataFrame\n",
    "from functools import reduce\n",
    "\n",
    "def union_many(*dfs):\n",
    "    #this function can have as many dataframes as you want passed into it\n",
    "    #the asterics before the name `dfs` tells Python that `dfs` will be a list\n",
    "    #containing all of the arguments we pass into union_many when it is called\n",
    "    \n",
    "    return reduce(DataFrame.unionAll, dfs)\n",
    "\n",
    "df_union = union_many(df1, df2, df3)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----+--------+-----+\n",
      "| name|     pet|count|\n",
      "+-----+--------+-----+\n",
      "|  Sue|     cat|   16|\n",
      "|  Kim|     dog|    1|\n",
      "|  Bob|    fish|    5|\n",
      "| Fred|     cat|    2|\n",
      "| Kate|     ant|  179|\n",
      "| Marc|  lizard|    5|\n",
      "|Sarah|   shark|    3|\n",
      "|Jason|    kids|    2|\n",
      "|Scott|squirrel|    1|\n",
      "+-----+--------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_union.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Merging Columns by Matching Rows\n",
    "\n",
    "The other way to merge is by combining columns on certain keys across rows.  If you are familiar with SQL, pySpark )and Pandas for non-distributed data) draws its merging terminology from that.  If you are coming from Stata, this is a generally more intuitive way to think about many-to-one, one-to-one and many-to-many merges.  \n",
    "\n",
    "After we build our data there are four ways to specify the logic of the operation:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "row1 = Row(\"name\", \"pet\", \"count\")\n",
    "row2 = Row(\"name\", \"pet2\", \"count2\")\n",
    "\n",
    "df1 = sc.parallelize([\n",
    "    row1(\"Sue\", \"cat\", 16),\n",
    "    row1(\"Kim\", \"dog\", 1),    \n",
    "    row1(\"Bob\", \"fish\", 5),\n",
    "    row1(\"Libuse\", \"horse\", 1)\n",
    "    ]).toDF()\n",
    "\n",
    "df2 = sc.parallelize([\n",
    "    row2(\"Sue\", \"eagle\", 2),\n",
    "    row2(\"Kim\", \"ant\", 179),    \n",
    "    row2(\"Bob\", \"lizard\", 5),\n",
    "    row2(\"Ferdinand\", \"bees\", 23)\n",
    "    ]).toDF()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First we'll do an `inner join`, which *merges rows that have a match in both dataframes* and **drops** all others.  This is the default type of join, so the `how` argument could be omitted here if you didn't wish to be explicit (being explicit is almost always better, however).  We will merge on the entries in the `name` column, which you can see is the second argument in the method; this can also be a `list` if the merge should happen on more than one matching value:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+----+-----+------+------+\n",
      "|name| pet|count|  pet2|count2|\n",
      "+----+----+-----+------+------+\n",
      "| Sue| cat|   16| eagle|     2|\n",
      "| Bob|fish|    5|lizard|     5|\n",
      "| Kim| dog|    1|   ant|   179|\n",
      "+----+----+-----+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df1.join(df2, 'name', how='inner').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The \"left\" dataframe here is `df1`, the \"right\" dataframe is `df2` - the names simply desribe their relative locations in the line of code.  Notice that the entries for Libuse and Ferdinand are dropped, because they do not appear in *both* dataframes.\n",
    "\n",
    "An **outer join**, which *uses all rows from both dataframes regardless of matches*, fills in `null` for missing observations.  Using the same two dataframes:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+-----+------+------+\n",
      "|     name|  pet|count|  pet2|count2|\n",
      "+---------+-----+-----+------+------+\n",
      "|      Sue|  cat|   16| eagle|     2|\n",
      "|Ferdinand| null| null|  bees|    23|\n",
      "|      Bob| fish|    5|lizard|     5|\n",
      "|      Kim|  dog|    1|   ant|   179|\n",
      "|   Libuse|horse|    1|  null|  null|\n",
      "+---------+-----+-----+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df1.join(df2, 'name', how='outer').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Libuse and Ferdinand both made it into the output now, but each has `null` values filled in where necessary.\n",
    "\n",
    "And finally a **left join** uses *all keys from the left dataframe* (in this case `df1`).  Data from the right dataframe only shows up if it matches something in the left:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------+-----+-----+------+------+\n",
      "|  name|  pet|count|  pet2|count2|\n",
      "+------+-----+-----+------+------+\n",
      "|   Sue|  cat|   16| eagle|     2|\n",
      "|   Bob| fish|    5|lizard|     5|\n",
      "|   Kim|  dog|    1|   ant|   179|\n",
      "|Libuse|horse|    1|  null|  null|\n",
      "+------+-----+-----+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df1.join(df2, 'name', how='left').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So the entry for Ferdinand was dropped because it has no match in the left dataframe.  \n",
    "\n",
    "A **right join** would just be the opposte of that, and would drop Libuse but keep Ferdinand with `null` entries where necessary.  A `right` join is equivalent to performing a `left` join but switching the places of `df1` and `df2` in the code block, that is: \n",
    "\n",
    "    df2.join(df1, 'name', how='left')\n",
    "\n",
    "is logically the same as:\n",
    "\n",
    "    df1.join(df2, 'name', how='right')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
